/*
 * Copyright 2014-2025 Real Logic Limited.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.agrona;

import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.IntConsumer;

import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.SPARSE;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.agrona.BitUtil.SIZE_OF_INT;
import static org.agrona.BitUtil.SIZE_OF_LONG;

/**
 * A {@link MarkFile} is used to mark the presence of a running component and to track liveness.
 * <p>
 * The assumptions are: (1) the version field is an int in size, (2) the timestamp field is a long in size,
 * and (3) the version field comes before the timestamp field.
 */
public class MarkFile implements AutoCloseable
{
    /**
     * Special sentinel value used to indicate that mark file activation is in progress.
     */
    public static final long ACTIVATION_IN_PROGRESS_TIMESTAMP = Long.MAX_VALUE;

    private final int versionFieldOffset;
    private final int timestampFieldOffset;

    private final File parentDir;
    private final File markFile;
    private final MappedByteBuffer mappedBuffer;
    private final UnsafeBuffer buffer;

    private final AtomicBoolean isClosed = new AtomicBoolean();

    /**
     * Create a directory and mark file if none present. Checking if an active mark file exists and is active.
     * Old mark file is deleted and recreated if not active.
     * <p>
     * Total length of mark file will be mapped until {@link #close()} is called.
     *
     * @param directory             for the mark file.
     * @param filename              of the mark file.
     * @param warnIfDirectoryExists for logging purposes.
     * @param dirDeleteOnStart      if desired.
     * @param versionFieldOffset    to use for version field access.
     * @param timestampFieldOffset  to use for timestamp field access.
     * @param totalFileLength       to allocate when creating new mark file.
     * @param timeoutMs             for the activity check (in milliseconds).
     * @param epochClock            to use for time checks.
     * @param versionCheck          to use for existing mark file and version field.
     * @param logger                to use to signal progress or null.
     */
    public MarkFile(
        final File directory,
        final String filename,
        final boolean warnIfDirectoryExists,
        final boolean dirDeleteOnStart,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final int totalFileLength,
        final long timeoutMs,
        final EpochClock epochClock,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        validateOffsets(versionFieldOffset, timestampFieldOffset);

        ensureDirectoryExists(
            directory,
            filename,
            warnIfDirectoryExists,
            dirDeleteOnStart,
            versionFieldOffset,
            timestampFieldOffset,
            timeoutMs,
            epochClock,
            versionCheck,
            logger);

        this.parentDir = directory;
        this.markFile = new File(directory, filename);
        this.mappedBuffer = IoUtil.mapNewFile(markFile, totalFileLength);
        this.buffer = new UnsafeBuffer(mappedBuffer);
        this.versionFieldOffset = versionFieldOffset;
        this.timestampFieldOffset = timestampFieldOffset;
    }

    /**
     * Atomically create a new {@link MarkFile} if not present. Checking if an existing {@link MarkFile} is active.
     * Existing {@link MarkFile} is used exclusively if not active. See
     * {@link #mapNewOrExistingMarkFile(File, boolean, int, int, long, long, EpochClock, IntConsumer, Consumer)}
     * documentation about atomicity guarantees and implementation details.
     * <p>
     * Total length of mark file will be mapped until {@link #close()} is called.
     *
     * @param markFile             to use.
     * @param shouldPreExist       or not.
     * @param versionFieldOffset   to use for version field access.
     * @param timestampFieldOffset to use for timestamp field access.
     * @param totalFileLength      to allocate when creating new {@link MarkFile}.
     * @param timeoutMs            for the activity check (in milliseconds).
     * @param epochClock           to use for time checks.
     * @param versionCheck         to use for existing {@link MarkFile} and version field.
     * @param logger               to use to signal progress or null.
     * @see #mapNewOrExistingMarkFile(File, boolean, int, int, long, long, EpochClock, IntConsumer, Consumer)
     */
    public MarkFile(
        final File markFile,
        final boolean shouldPreExist,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final int totalFileLength,
        final long timeoutMs,
        final EpochClock epochClock,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        validateOffsets(versionFieldOffset, timestampFieldOffset);

        this.parentDir = markFile.getParentFile();
        this.markFile = markFile;
        this.mappedBuffer = mapNewOrExistingMarkFile(
            markFile,
            shouldPreExist,
            versionFieldOffset,
            timestampFieldOffset,
            totalFileLength,
            timeoutMs,
            epochClock,
            versionCheck,
            logger);

        this.buffer = new UnsafeBuffer(mappedBuffer);
        this.versionFieldOffset = versionFieldOffset;
        this.timestampFieldOffset = timestampFieldOffset;
    }

    /**
     * Map a pre-existing {@link MarkFile} if one present and is active.
     * <p>
     * Total length of {@link MarkFile} will be mapped until {@link #close()} is called.
     *
     * @param directory            for the {@link MarkFile} file.
     * @param filename             of the {@link MarkFile} file.
     * @param versionFieldOffset   to use for version field access.
     * @param timestampFieldOffset to use for timestamp field access.
     * @param timeoutMs            for the activity check (in milliseconds) and for how long to wait for file to exist.
     * @param epochClock           to use for time checks.
     * @param versionCheck         to use for existing {@link MarkFile} file and version field.
     * @param logger               to use to signal progress or null.
     */
    public MarkFile(
        final File directory,
        final String filename,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final long timeoutMs,
        final EpochClock epochClock,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        validateOffsets(versionFieldOffset, timestampFieldOffset);

        this.parentDir = directory;
        this.markFile = new File(directory, filename);
        this.mappedBuffer = mapExistingMarkFile(
            markFile, versionFieldOffset, timestampFieldOffset, timeoutMs, epochClock, versionCheck, logger);
        this.buffer = new UnsafeBuffer(mappedBuffer);
        this.versionFieldOffset = versionFieldOffset;
        this.timestampFieldOffset = timestampFieldOffset;
    }

    /**
     * Manage a {@link MarkFile} given a mapped file and offsets of version and timestamp.
     * <p>
     * If mappedBuffer is not null, then it will be unmapped upon {@link #close()}.
     *
     * @param mappedBuffer         for the {@link MarkFile} fields.
     * @param versionFieldOffset   for the version field.
     * @param timestampFieldOffset for the timestamp field.
     */
    public MarkFile(final MappedByteBuffer mappedBuffer, final int versionFieldOffset, final int timestampFieldOffset)
    {
        validateOffsets(versionFieldOffset, timestampFieldOffset);

        this.parentDir = null;
        this.markFile = null;
        this.mappedBuffer = mappedBuffer;
        this.buffer = new UnsafeBuffer(mappedBuffer);
        this.versionFieldOffset = versionFieldOffset;
        this.timestampFieldOffset = timestampFieldOffset;
    }

    /**
     * Manage a {@link MarkFile} given a buffer and offsets of version and timestamp.
     *
     * @param buffer               for the {@link MarkFile} fields
     * @param versionFieldOffset   for the version field
     * @param timestampFieldOffset for the timestamp field
     */
    public MarkFile(final UnsafeBuffer buffer, final int versionFieldOffset, final int timestampFieldOffset)
    {
        validateOffsets(versionFieldOffset, timestampFieldOffset);

        this.parentDir = null;
        this.markFile = null;
        this.mappedBuffer = null;
        this.buffer = buffer;
        this.versionFieldOffset = versionFieldOffset;
        this.timestampFieldOffset = timestampFieldOffset;
    }

    /**
     * Checks if {@link MarkFile} is closed.
     *
     * @return {@code true} if {@link MarkFile} is closed.
     */
    public boolean isClosed()
    {
        return isClosed.get();
    }

    /**
     * {@inheritDoc}
     */
    public void close()
    {
        if (isClosed.compareAndSet(false, true))
        {
            BufferUtil.free(mappedBuffer);
        }
    }

    /**
     * Perform an ordered put of the version field.
     *
     * @param version to be signaled.
     */
    public void signalReady(final int version)
    {
        buffer.putIntRelease(versionFieldOffset, version);
    }

    /**
     * Perform volatile read of the version field.
     *
     * @return value of the version field.
     */
    public int versionVolatile()
    {
        return buffer.getIntVolatile(versionFieldOffset);
    }

    /**
     * Perform weak/plain read of the version field.
     *
     * @return value of the version field.
     */
    public int versionWeak()
    {
        return buffer.getInt(versionFieldOffset);
    }

    /**
     * Set timestamp field using an ordered put.
     * <p>
     * This method is identical to {@link #timestampRelease(long)} and that method is preferred.
     *
     * @param timestamp to be set.
     */
    public void timestampOrdered(final long timestamp)
    {
        timestampRelease(timestamp);
    }

    /**
     * Set timestamp field using a release put.
     *
     * @param timestamp to be set.
     */
    public void timestampRelease(final long timestamp)
    {
        buffer.putLongRelease(timestampFieldOffset, timestamp);
    }

    /**
     * Perform volatile read of the timestamp field.
     *
     * @return value of the timestamp field.
     */
    public long timestampVolatile()
    {
        return buffer.getLongVolatile(timestampFieldOffset);
    }

    /**
     * Perform weak/plain read of the timestamp field.
     *
     * @return value of the timestamp field.
     */
    public long timestampWeak()
    {
        return buffer.getLong(timestampFieldOffset);
    }

    /**
     * Delete parent directory.
     *
     * @param ignoreFailures should the failures be silently ignored.
     */
    public void deleteDirectory(final boolean ignoreFailures)
    {
        IoUtil.delete(parentDir, ignoreFailures);
    }

    /**
     * Returns parent directory.
     *
     * @return parent directory.
     */
    public File parentDirectory()
    {
        return parentDir;
    }

    /**
     * Returns {@link MarkFile}.
     *
     * @return {@link MarkFile}.
     */
    public File markFile()
    {
        return markFile;
    }

    /**
     * Returns the underlying {@link MappedByteBuffer}.
     *
     * @return reference to the {@link MappedByteBuffer}.
     */
    public MappedByteBuffer mappedByteBuffer()
    {
        return mappedBuffer;
    }

    /**
     * Returns the underlying {@link UnsafeBuffer}.
     *
     * @return reference to the {@link UnsafeBuffer}.
     */
    public UnsafeBuffer buffer()
    {
        return buffer;
    }

    /**
     * Ensure the directory exists, i.e. create if it does not exist yet and re-create if it already exists.
     *
     * @param directory             to create.
     * @param filename              of the {@link MarkFile}.
     * @param warnIfDirectoryExists should print warning if directory already exists.
     * @param dirDeleteOnStart      should directory be deleted if it already exists. When the flag is set to
     *                              {@code false} the check will be made to see if the {@link MarkFile} is active.
     *                              <p>Note: the directory will be deleted anyway even if the flag is {@code false}.
     * @param versionFieldOffset    offset of the version field.
     * @param timestampFieldOffset  offset of the timestamp field.
     * @param timeoutMs             timeout in milliseconds.
     * @param epochClock            epoch clock.
     * @param versionCheck          {@link MarkFile} version check function.
     * @param logger                to use for reporting warnings.
     * @throws IllegalStateException if {@link MarkFile} already exists and is active and
     *                               {@code dirDeleteOnStart=false}.
     */
    public static void ensureDirectoryExists(
        final File directory,
        final String filename,
        final boolean warnIfDirectoryExists,
        final boolean dirDeleteOnStart,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final long timeoutMs,
        final EpochClock epochClock,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        final File markFile = new File(directory, filename);

        if (directory.isDirectory())
        {
            if (warnIfDirectoryExists && null != logger)
            {
                logger.accept("WARNING: " + directory + " already exists.");
            }

            if (!dirDeleteOnStart)
            {
                final int offset = Math.min(versionFieldOffset, timestampFieldOffset);
                final int length = Math.max(versionFieldOffset, timestampFieldOffset) + SIZE_OF_LONG - offset;
                final MappedByteBuffer byteBuffer = mapExistingFile(markFile, logger, offset, length);

                try
                {
                    if (isActive(
                        byteBuffer,
                        epochClock,
                        timeoutMs,
                        versionFieldOffset - offset,
                        timestampFieldOffset - offset,
                        versionCheck,
                        logger))
                    {
                        throw new IllegalStateException("active mark file detected: " + markFile);
                    }
                }
                finally
                {
                    BufferUtil.free(byteBuffer);
                }
            }

            IoUtil.delete(directory, false);
        }

        IoUtil.ensureDirectoryExists(directory, directory.toString());
    }

    /**
     * Await the creation of the {@link MarkFile}.
     *
     * @param logger     to use for warnings.
     * @param markFile   the {@link MarkFile}.
     * @param deadlineMs deadline timeout in milliseconds.
     * @param epochClock epoch clock.
     * @return {@link MappedByteBuffer} for the {@link MarkFile}.
     * @throws IllegalStateException if deadline timeout is reached.
     * @throws UncheckedIOException  in case of I/O errors.
     */
    @SuppressWarnings("try")
    public static MappedByteBuffer waitForFileMapping(
        final Consumer<String> logger, final File markFile, final long deadlineMs, final EpochClock epochClock)
    {
        while (true)
        {
            try (FileChannel fileChannel = FileChannel.open(markFile.toPath(), READ, WRITE))
            {
                final long size = fileChannel.size();
                if (size < (SIZE_OF_INT + SIZE_OF_LONG))
                {
                    if (epochClock.time() > deadlineMs)
                    {
                        throw new IllegalStateException("mark file is created but not populated: " + markFile);
                    }

                    fileChannel.close();
                    sleep(16);
                    continue;
                }

                if (null != logger)
                {
                    logger.accept("INFO: mark file exists: " + markFile);
                }

                return fileChannel.map(READ_WRITE, 0, size);
            }
            catch (final IOException ex)
            {
                throw new UncheckedIOException("cannot open mark file", ex);
            }
        }
    }

    /**
     * Map existing {@link MarkFile}.
     *
     * @param markFile             the {@link MarkFile}.
     * @param versionFieldOffset   offset of the version field.
     * @param timestampFieldOffset offset of the timestamp field.
     * @param timeoutMs            timeout in milliseconds.
     * @param epochClock           epoch clock.
     * @param versionCheck         version check function.
     * @param logger               for the warnings.
     * @return {@link MappedByteBuffer} for the {@link MarkFile}.
     * @throws IllegalStateException if timeout is reached.
     * @throws IllegalStateException if {@link MarkFile} has wrong size.
     * @throws UncheckedIOException  in case of I/O errors.
     */
    public static MappedByteBuffer mapExistingMarkFile(
        final File markFile,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final long timeoutMs,
        final EpochClock epochClock,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        final long deadlineMs = epochClock.time() + timeoutMs;

        while (!markFile.exists() || markFile.length() < (timestampFieldOffset + SIZE_OF_LONG))
        {
            if (epochClock.time() > deadlineMs)
            {
                throw new IllegalStateException("mark file not created: " + markFile);
            }

            sleep(16);
        }

        final MappedByteBuffer byteBuffer = waitForFileMapping(logger, markFile, deadlineMs, epochClock);
        try
        {
            if (byteBuffer.capacity() < (timestampFieldOffset + SIZE_OF_LONG))
            {
                throw new IllegalStateException("mark file mapping is to small: capacity=" + byteBuffer.capacity() +
                    ", file=" + markFile);
            }

            final UnsafeBuffer buffer = new UnsafeBuffer(byteBuffer);
            int version;
            while (0 == (version = buffer.getIntVolatile(versionFieldOffset)))
            {
                if (epochClock.time() > deadlineMs)
                {
                    throw new IllegalStateException("mark file is created but not initialised: " + markFile);
                }

                sleep(1);
            }

            versionCheck.accept(version);

            while (0 == buffer.getLongVolatile(timestampFieldOffset))
            {
                if (epochClock.time() > deadlineMs)
                {
                    throw new IllegalStateException("no non-zero timestamp detected: " + markFile);
                }

                sleep(1);
            }
        }
        catch (final RuntimeException ex)
        {
            BufferUtil.free(byteBuffer);
            throw ex;
        }

        return byteBuffer;
    }

    /**
     * Map new or existing {@link MarkFile} atomically, i.e. allowing only a single process to succeed.
     * <ul>
     *     <li>{@code shouldPreExist == false} then an attempt to atomically
     *     create a new mark file is made. If creation fails then operation is aborted.</li>
     *     <li>{@code shouldPreExist == true} then open it for read and write access and if
     *     that fails abort. Once file is opened perform version validation using provided
     *     {@code versionCheck} function. Additionally, use the activity timestamp (i.e. value at
     *     {@code timestampFieldOffset}) to verify that the mark file is not currently in use. Finally, before returning
     *     from this method a special sentinel value {@link #ACTIVATION_IN_PROGRESS_TIMESTAMP} is atomically set into
     *     the {@code timestampFieldOffset} field to prevent concurrent activation.</li>
     * </ul>
     *
     * @param markFile             the {@link MarkFile}.
     * @param shouldPreExist       should {@link MarkFile} already exist. If {@code false} is specified it will attempt
     *                             to atomically create a new file.
     * @param versionFieldOffset   offset of the version field.
     * @param timestampFieldOffset offset of the timestamp field.
     * @param totalFileLength      total file length to be mapped.
     * @param timeoutMs            timeout in milliseconds.
     * @param epochClock           epoch clock.
     * @param versionCheck         version check function.
     * @param logger               for the warnings.
     * @return {@link MappedByteBuffer} for the {@link MarkFile}.
     * @throws IllegalStateException if {@code shouldPreExist == false} and the file was not created.
     * @throws IllegalStateException if {@code shouldPreExist == true} and the version validation fails.
     * @throws IllegalStateException if {@code shouldPreExist == true} and an active process using the mark file is
     *                               detected.
     * @throws IllegalStateException if timeout is reached.
     * @throws UncheckedIOException  in case of I/O errors.
     * @throws NullPointerException if {@code null == markFile || null == versionCheck} .
     * @see #ACTIVATION_IN_PROGRESS_TIMESTAMP
     */
    public static MappedByteBuffer mapNewOrExistingMarkFile(
        final File markFile,
        final boolean shouldPreExist,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final long totalFileLength,
        final long timeoutMs,
        final EpochClock epochClock,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        Objects.requireNonNull(epochClock, "epochClock must not be null");
        Objects.requireNonNull(versionCheck, "versionCheck must not be null");
        MappedByteBuffer byteBuffer = null;

        final EnumSet<StandardOpenOption> openOptions = shouldPreExist ?
            EnumSet.of(READ, WRITE) : EnumSet.of(CREATE_NEW, SPARSE, READ, WRITE);

        try (FileChannel channel = FileChannel.open(markFile.toPath(), openOptions))
        {
            byteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, totalFileLength);
            final UnsafeBuffer buffer = new UnsafeBuffer(byteBuffer);

            if (shouldPreExist)
            {
                if (buffer.capacity() < (timestampFieldOffset + SIZE_OF_LONG))
                {
                    throw new IllegalStateException("active mark file too short: capacity=" + buffer.capacity() +
                        " < " + (timestampFieldOffset + SIZE_OF_LONG) + ", file=" + markFile);
                }

                final int version = buffer.getIntVolatile(versionFieldOffset);

                if (null != logger)
                {
                    logger.accept("INFO: mark file exists: " + markFile);
                }

                versionCheck.accept(version);

                final long timestampMs = buffer.getLongVolatile(timestampFieldOffset);
                final long timestampAgeMs = epochClock.time() - timestampMs;

                if (null != logger)
                {
                    logger.accept("INFO: heartbeat timestampMs=" + timestampMs + " ageMs=" + timestampAgeMs);
                }

                if (timestampAgeMs < timeoutMs)
                {
                    throw new IllegalStateException("active mark file detected: " + markFile);
                }

                // prevent concurrent activations from multiple processes and ensure that further attempts are rejected
                // while the current activation is still in progress
                if (!buffer.compareAndSetLong(timestampFieldOffset, timestampMs, ACTIVATION_IN_PROGRESS_TIMESTAMP))
                {
                    throw new IllegalStateException("concurrent mark file activation: " + markFile);
                }
            }
        }
        catch (final IOException ex)
        {
            if (null != byteBuffer)
            {
                BufferUtil.free(byteBuffer);
            }
            throw new UncheckedIOException(
                shouldPreExist ? "failed to open existing mark file: " + markFile :
                    "failed to create new mark file: " + markFile,
                ex);
        }
        catch (final RuntimeException ex)
        {
            if (null != byteBuffer)
            {
                BufferUtil.free(byteBuffer);
            }
            throw ex;
        }

        return byteBuffer;
    }

    /**
     * Map existing {@link MarkFile}.
     *
     * @param markFile the {@link MarkFile}.
     * @param logger   for the warnings.
     * @param offset   offset to map at.
     * @param length   to map.
     * @return {@link MappedByteBuffer} for the {@link MarkFile}.
     */
    public static MappedByteBuffer mapExistingFile(
        final File markFile, final Consumer<String> logger, final long offset, final long length)
    {
        if (markFile.exists())
        {
            if (null != logger)
            {
                logger.accept("INFO: mark file exists: " + markFile);
            }

            return IoUtil.mapExistingFile(markFile, markFile.toString(), offset, length);
        }

        return null;
    }

    /**
     * Check if {@link MarkFile} is active, i.e. still in use.
     *
     * @param byteBuffer           the {@link MappedByteBuffer}.
     * @param epochClock           epoch clock.
     * @param timeoutMs            timeout in milliseconds.
     * @param versionFieldOffset   offset of the version field.
     * @param timestampFieldOffset offset of the timestamp field.
     * @param versionCheck         version check function.
     * @param logger               for the warnings.
     * @return {@code true} if {@link MarkFile} is active.
     */
    public static boolean isActive(
        final MappedByteBuffer byteBuffer,
        final EpochClock epochClock,
        final long timeoutMs,
        final int versionFieldOffset,
        final int timestampFieldOffset,
        final IntConsumer versionCheck,
        final Consumer<String> logger)
    {
        if (null == byteBuffer)
        {
            return false;
        }

        final UnsafeBuffer buffer = new UnsafeBuffer(byteBuffer);
        final long deadlineMs = epochClock.time() + timeoutMs;
        int version;

        while (0 == (version = buffer.getIntVolatile(versionFieldOffset)))
        {
            if (epochClock.time() > deadlineMs)
            {
                throw new IllegalStateException("mark file is created but not initialised");
            }

            sleep(1);
        }

        versionCheck.accept(version);

        final long timestampMs = buffer.getLongVolatile(timestampFieldOffset);
        final long nowMs = epochClock.time();
        final long timestampAgeMs = nowMs - timestampMs;

        if (null != logger)
        {
            logger.accept("INFO: heartbeat timestampMs=" + timestampMs + " ageMs=" + timestampAgeMs);
        }

        return timestampAgeMs <= timeoutMs;
    }

    /**
     * Ensure a link file exists if required for the actual mark file. A link file will contain the pathname of the
     * actual mark file's parent directory. This is useful if the mark file should be stored on a different storage
     * medium to the directory of the service. This will create a file with name of {@code linkFilename} in the {@code
     * serviceDir}. If {@code actualFile} is an immediate child of {@code serviceDir} then any file with the name of
     * {@code linkFilename} will be deleted from the {@code serviceDir} (so that links won't be present if not
     * required).
     *
     * @param serviceDir   directory where the mark file would normally be stored (e.g. archiveDir, clusterDir).
     * @param actualFile   location of actual mark file, e.g. /dev/shm/service/node0/archive-mark.dat
     * @param linkFilename short name that should be used for the link file, e.g. archive-mark.lnk
     */
    public static void ensureMarkFileLink(final File serviceDir, final File actualFile, final String linkFilename)
    {
        final String serviceDirPath;
        final String markFileParentPath;

        try
        {
            serviceDirPath = serviceDir.getCanonicalPath();
        }
        catch (final IOException ex)
        {
            throw new IllegalArgumentException("failed to resolve canonical path for archiveDir=" + serviceDir);
        }

        try
        {
            markFileParentPath = actualFile.getParentFile().getCanonicalPath();
        }
        catch (final IOException ex)
        {
            throw new IllegalArgumentException(
                "failed to resolve canonical path for markFile parent dir of " + actualFile);
        }

        final Path linkFile = new File(serviceDirPath, linkFilename).toPath();
        if (serviceDirPath.equals(markFileParentPath))
        {
            try
            {
                Files.deleteIfExists(linkFile);
            }
            catch (final IOException ex)
            {
                throw new RuntimeException("failed to remove old link file", ex);
            }
        }
        else
        {
            try
            {
                Files.write(
                    linkFile,
                    markFileParentPath.getBytes(US_ASCII),
                    StandardOpenOption.CREATE,
                    StandardOpenOption.WRITE,
                    StandardOpenOption.TRUNCATE_EXISTING);
            }
            catch (final IOException ex)
            {
                throw new RuntimeException("failed to create link for mark file directory", ex);
            }
        }
    }

    /**
     * Put thread to sleep for the given duration and restore interrupted status if thread is interrupted while
     * sleeping.
     *
     * @param durationMs sleep duration in milliseconds.
     */
    protected static void sleep(final long durationMs)
    {
        try
        {
            Thread.sleep(durationMs);
        }
        catch (final InterruptedException ignore)
        {
            Thread.currentThread().interrupt();
        }
    }

    private static void validateOffsets(final int versionFieldOffset, final int timestampFieldOffset)
    {
        if ((versionFieldOffset + SIZE_OF_INT) > timestampFieldOffset)
        {
            throw new IllegalArgumentException("version field must precede the timestamp field");
        }
    }
}
